package com.hotitems_analysis


import java.sql.Timestamp
import java.util.Properties

import com.hotitems_analysis.bean.{ItemViewCount, UserBehavior}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.java.tuple.{Tuple, Tuple1}
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

/**
  * @Description: TODO QQ1667847363
  * @author: xiao kun tai
  * @date:2021/11/29 23:26
  *                 最热门 Top N 商品
  */
object HotItems {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //定义事件时间语义

    //从文件中读取数据，并转换成样例类,提取时间戳生成watermark
    val filePath: String = "HotItemsAnalysis/src/main/resources/UserBehavior.csv"
    val fileStream: DataStream[String] = env.readTextFile(filePath)

    //从kafka读取
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "192.168.88.106:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer",
      "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer",
      "org.apache.kafka.common.serialization.StringDeserializer")
    val kafkaStream: DataStream[String] = env
      .addSource(new FlinkKafkaConsumer[String]("hotitems", new SimpleStringSchema(), properties))

    val dataStream: DataStream[UserBehavior] = kafkaStream.map(data => {
      val arr = data.split(",");
      UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3), arr(4).toLong)
    })
      .assignAscendingTimestamps(_.timestamp * 1000L)
    //得到窗口聚合结果
    val aggStream: DataStream[ItemViewCount] = dataStream
      .filter(_.behavior == "pv") //过滤pv行为
      .keyBy("itemId") //按照商品ID分组
      .timeWindow(Time.hours(1), Time.minutes(5)) //设置活动窗口进行统计
      .aggregate(new CountAgg, new ItemViewWindowResult)

    val resultStream: DataStream[String] = aggStream
      .keyBy("windowEnd") //按照窗口分组，收集当前窗口内的商品count数据
      .process(new TopNHotItems(5)) //自定义处理流程

    dataStream.print("data")
    aggStream.print("agg")
    resultStream.print("result")


    env.execute("hot items")
  }

  //自定义预聚合函数AggregateFunction
  //聚合状态就是当前商品的count值
  class CountAgg extends AggregateFunction[UserBehavior, Long, Long] {
    override def createAccumulator(): Long = 0L

    override def add(in: UserBehavior, acc: Long): Long = acc + 1

    override def getResult(acc: Long): Long = acc

    override def merge(acc: Long, acc1: Long): Long = acc + acc1
  }

  //自定义窗口函数WindowFunction
  class ItemViewWindowResult extends WindowFunction[Long, ItemViewCount, Tuple, TimeWindow] {
    override def apply(key: Tuple, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
      val itemId = key.asInstanceOf[Tuple1[Long]].f0
      val windowEnd = window.getEnd
      val count = input.iterator.next()
      out.collect(ItemViewCount(itemId, windowEnd, count))
    }
  }


  class TopNHotItems(topSize: Int) extends KeyedProcessFunction[Tuple, ItemViewCount, String] {
    //定义状态ListState
    private var itemViewCountListState: ListState[ItemViewCount] = _

    override def open(parameters: Configuration): Unit = {
      itemViewCountListState = getRuntimeContext
        .getListState(new ListStateDescriptor[ItemViewCount]("itemViewCount-list", classOf[ItemViewCount]))
    }

    override def processElement(i: ItemViewCount, context: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit = {
      //每来一条数据，直接加入ListState
      itemViewCountListState.add(i)
      //注册一个windowEnd+1之后触发的定时器(当前窗口数据到齐)
      context.timerService().registerEventTimeTimer(i.windowEnd + 1)
    }

    override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
      //为了方便排序，另外定义一个ListBuffer。保存ListState里面的所有数据
      val allItemViewCounts: ListBuffer[ItemViewCount] = ListBuffer()
      val iter = itemViewCountListState.get().iterator()
      while (iter.hasNext) {
        allItemViewCounts += iter.next()
      }

      //清空状态，减少内存使用
      itemViewCountListState.clear()

      //按照count大小排序,取前n个
      val sortedItemViewCounts = allItemViewCounts
        .sortBy(_.count)(Ordering.Long.reverse)
        .take(topSize)

      //将排名信息格式化String，便于打印输出可视化
      val result: StringBuilder = new StringBuilder
      result.append("窗口结束时间：").append(new Timestamp(timestamp - 1)).append("\n")

      //遍历结果列表中的每个ItemViewCount，输出到一行
      for (i <- sortedItemViewCounts.indices) {
        val currentItemViewCount = sortedItemViewCounts(i)
        result
          .append("NO: ").append(i + 1).append("\t")
          .append("商品ID = ").append(currentItemViewCount.itemId).append("\t")
          .append("热门度 = ").append(currentItemViewCount.count).append("\n")
      }
      result.append("===============================================\n")

      Thread.sleep(1000)
      out.collect(result.toString())
    }
  }

}
